apache

您所在的位置:网站首页 kafka has passed since apache

apache

2024-05-15 15:07| 来源: 网络整理| 查看: 265

我正在使用 kafka_2.11-2.1.1和生产者使用 spring 2.1.0.RELEASE。

我在向 Kafka 主题发送消息时正在使用 spring,我的生产者生成了很多 TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time 我正在使用以下 kafka 生产者设置acks: 1 retries: 1 batchSize: 100 lingerMs: 5 bufferMemory: 33554432 requestTimeoutMs: 60 我尝试了很多组合(特别是 batchSize 和 lingerMs )但没有任何效果。任何帮助请问上述场景的设置应该是什么。

使用以下配置再次尝试......但没有运气同样的错误

acks = 1 batch.size = 15 buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 120 retries = 1 第二次运行:

我尝试了不同的组合,但没有任何效果。因此我认为这会是网络、SSL 等的问题。所以我在生产者运行的同一台机器上安装并运行了 Kafka,即在我的本地计算机上。

我尝试再次运行指向本地 Kafka 主题的生产者。但没有运气同样的问题。

下面是使用的配置参数。

2019-07-02 05:55:36.663 INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 0 bootstrap.servers = [localhost:9092] request.timeout.ms = 60 retries = 1 buffer.memory = 33554432 linger.ms = 0 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = [] max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null 面临同样的错误:org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic--1: 69 ms 自批处理创建以来已经过去了加上逗留时间

也试过批量大小 5、10 和 0linger_ms 0 、 5 、 10 等。request_time_out 0 、 45 、 60 、 120 、 300 等。

没有任何作用......同样的错误。

我还应该尝试什么,可能的解决方案是什么?

如何避免负键生成

是的,我设置了本地设置并打印带有分区信息的日志,如下所示

2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] c.s.c.p.p.CompanyInfoPartitioner:主题:inbound_topic Key = 597736248-Entropy Cayman Solar Ltd.-null-null-679 |分区 = -1 2019-07-03 02:48:28.931 错误 7092 --- [广告 | producer-1] osksupport.LoggingProducerListener :发送带有 key='597736248-Entropy Cayman Solar Ltd.-null-null-null' 和 payload='com.spgmi.ca.prescore.model.Company@8b12343 的消息时抛出异常' 到主题 inbound_topic :

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic --1: 104 ms 自批量创建以来已经过去了加上逗留时间

我的主题 inbound_topic 有两个分区,如下所示C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe --zookeeper localhost:2181 --topic inbound_topic主题:inbound_topic PartitionCount:2 ReplicationFactor:1 配置: 主题:inbound_topic 分区:0 领导者:0 副本:0 Isr:0 主题:inbound_topic 分区:1 领导者:0 副本:0 Isr:0

但是我的制作人似乎试图发送到 Partition = -1。

我的分区逻辑如下

int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions; logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p ); 关键是我在做 hashCode()。这里需要纠正什么以避免这个负数分区号?即分区 = -1

我的分区键逻辑应该是什么样的?

任何帮助高度appriciated。

最佳答案

该错误表明某些记录以比从客户端发送的速度更快的速度放入队列。当您的 Producer 发送消息时,它们会存储在缓冲区中(在将它们发送到目标代理之前),并且将记录分组在一起以增加吞吐量。当新记录添加到批处理中时,它必须在由 request.timeout.ms 控制的可配置时间窗口内发送。 (默认设置为 30 秒)。如果批次在队列中的时间更长,则 TimeoutException抛出,然后批处理记录将从队列中删除,并且不会传递给代理。增加 request.timeout.ms 的值应该为你做的伎俩。

如果这不起作用,您也可以尝试减少 batch.size以便更频繁地发送批次(但这次将包含更少的消息)并确保 linger.ms设置为 0(这是默认值)。请注意,您需要在更改任何配置参数后重新启动 kafka 代理。如果您仍然收到错误,我认为您的网络出现问题。您是否启用了 SSL?

关于apache-kafka - 如何修复 kafka.common.errors.TimeoutException : Expiring 1 record(s) xxx ms has passed since batch creation plus linger time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56807188/



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3